/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.client.googleapis.json.GoogleJsonResponseException;

import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;

import com.google.api.client.http.HttpRequestInitializer;

import com.google.api.client.util.BackOff;

import com.google.api.client.util.BackOffUtils;

import com.google.api.client.util.ExponentialBackOff;

import com.google.api.client.util.Sleeper;

import com.google.api.gax.core.FixedCredentialsProvider;

import com.google.api.gax.rpc.FixedHeaderProvider;

import com.google.api.gax.rpc.HeaderProvider;

import com.google.api.services.bigquery.Bigquery;

import com.google.api.services.bigquery.Bigquery.Tables;

import com.google.api.services.bigquery.model.*;

import com.google.auth.Credentials;

import com.google.auth.http.HttpCredentialsAdapter;

import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;

import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;

import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;

import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;

import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;

import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;

import com.google.cloud.hadoop.util.ApiErrorExtractor;

import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.extensions.gcp.auth.NullCredentialInitializer;

import com.bff.gaia.unified.sdk.extensions.gcp.options.GcsOptions;

import com.bff.gaia.unified.sdk.extensions.gcp.util.BackOffAdapter;

import com.bff.gaia.unified.sdk.extensions.gcp.util.CustomHttpErrors;

import com.bff.gaia.unified.sdk.extensions.gcp.util.RetryHttpRequestInitializer;

import com.bff.gaia.unified.sdk.extensions.gcp.util.Transport;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.util.FluentBackoff;

import com.bff.gaia.unified.sdk.util.ReleaseInfo;

import com.bff.gaia.unified.sdk.values.ValueInSingleWindow;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import org.joda.time.Duration;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Collection;

import java.util.List;

import java.util.concurrent.*;

import java.util.stream.Collectors;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/**

 * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery

 * service.

 */

class BigQueryServicesImpl implements BigQueryServices {



  private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);



  // How frequently to log while polling.

  private static final Duration POLLING_LOG_GAP = Duration.standardMinutes(10);



  // The maximum number of retries to execute a BigQuery RPC.

  private static final int MAX_RPC_RETRIES = 9;



  // The initial backoff for executing a BigQuery RPC.

  private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds(1);



  // The initial backoff for polling the status of a BigQuery job.

  private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1);



  private static final FluentBackoff DEFAULT_BACKOFF_FACTORY =

      FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF);



  @Override

  public JobService getJobService(BigQueryOptions options) {

    return new JobServiceImpl(options);

  }



  @Override

  public DatasetService getDatasetService(BigQueryOptions options) {

    return new DatasetServiceImpl(options);

  }



  @Override

  public StorageClient getStorageClient(BigQueryOptions options) throws IOException {

    return new StorageClientImpl(options);

  }



  private static BackOff createDefaultBackoff() {

    return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff());

  }



  @VisibleForTesting

  static class JobServiceImpl implements BigQueryServices.JobService {

    private final ApiErrorExtractor errorExtractor;

    private final Bigquery client;



    @VisibleForTesting

    JobServiceImpl(Bigquery client) {

      this.errorExtractor = new ApiErrorExtractor();

      this.client = client;

    }



    private JobServiceImpl(BigQueryOptions options) {

      this.errorExtractor = new ApiErrorExtractor();

      this.client = newBigQueryClient(options).build();

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)

        throws InterruptedException, IOException {

      Job job =

          new Job()

              .setJobReference(jobRef)

              .setConfiguration(new JobConfiguration().setLoad(loadConfig));



      startJob(job, errorExtractor, client);

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)

        throws InterruptedException, IOException {

      Job job =

          new Job()

              .setJobReference(jobRef)

              .setConfiguration(new JobConfiguration().setExtract(extractConfig));



      startJob(job, errorExtractor, client);

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)

        throws IOException, InterruptedException {

      Job job =

          new Job()

              .setJobReference(jobRef)

              .setConfiguration(new JobConfiguration().setQuery(queryConfig));



      startJob(job, errorExtractor, client);

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)

        throws IOException, InterruptedException {

      Job job =

          new Job()

              .setJobReference(jobRef)

              .setConfiguration(new JobConfiguration().setCopy(copyConfig));



      startJob(job, errorExtractor, client);

    }



    private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client)

        throws IOException, InterruptedException {

      startJob(job, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());

    }



    @VisibleForTesting

    static void startJob(

        Job job,

        ApiErrorExtractor errorExtractor,

        Bigquery client,

        Sleeper sleeper,

        BackOff backoff)

        throws IOException, InterruptedException {

      JobReference jobRef = job.getJobReference();

      Exception lastException;

      do {

        try {

          client.jobs().insert(jobRef.getProjectId(), job).execute();

          LOG.info(

              "Started BigQuery job: {}.\n{}",

              jobRef,

              formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));

          return; // SUCCEEDED

        } catch (IOException e) {

          if (errorExtractor.itemAlreadyExists(e)) {

            LOG.info("BigQuery job " + jobRef + " already exists, will not retry inserting it:", e);

            return; // SUCCEEDED

          }

          // ignore and retry

          LOG.info("Failed to insert job " + jobRef + ", will retry:", e);

          lastException = e;

        }

      } while (nextBackOff(sleeper, backoff));

      throw new IOException(

          String.format(

              "Unable to insert job: %s, aborting after %d .", jobRef.getJobId(), MAX_RPC_RETRIES),

          lastException);

    }



    @Override

    public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException {

      BackOff backoff =

          BackOffAdapter.toGcpBackOff(

              FluentBackoff.DEFAULT

                  .withMaxRetries(maxAttempts)

                  .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF)

                  .withMaxBackoff(Duration.standardMinutes(1))

                  .backoff());

      return pollJob(jobRef, Sleeper.DEFAULT, backoff);

    }



    @VisibleForTesting

    Job pollJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) throws InterruptedException {

      do {

        try {

          Job job =

              client

                  .jobs()

                  .get(jobRef.getProjectId(), jobRef.getJobId())

                  .setLocation(jobRef.getLocation())

                  .execute();

          if (job == null) {

            LOG.info("Still waiting for BigQuery job {} to start", jobRef);

            continue;

          }

          JobStatus status = job.getStatus();

          if (status == null) {

            LOG.info("Still waiting for BigQuery job {} to enter pending state", jobRef);

            continue;

          }

          if ("DONE".equals(status.getState())) {

            LOG.info("BigQuery job {} completed in state DONE", jobRef);

            return job;

          }

          // The job is not DONE, wait longer and retry.

          LOG.info(

              "Still waiting for BigQuery job {}, currently in status {}\n{}",

              jobRef.getJobId(),

              status,

              formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));

        } catch (IOException e) {

          // ignore and retry

          LOG.info("Ignore the error and retry polling job status.", e);

        }

      } while (nextBackOff(sleeper, backoff));

      LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());

      return null;

    }



    private static String formatBqStatusCommand(String projectId, String jobId) {

      return String.format("bq show -j --format=prettyjson --project_id=%s %s", projectId, jobId);

    }



    @Override

    public JobStatistics dryRunQuery(

        String projectId, JobConfigurationQuery queryConfig, String location)

        throws InterruptedException, IOException {

      JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId);

      Job job =

          new Job()

              .setJobReference(jobRef)

              .setConfiguration(new JobConfiguration().setQuery(queryConfig).setDryRun(true));

      return executeWithRetries(

              client.jobs().insert(projectId, job),

              String.format(

                  "Unable to dry run query: %s, aborting after %d retries.",

                  queryConfig, MAX_RPC_RETRIES),

              Sleeper.DEFAULT,

              createDefaultBackoff(),

              ALWAYS_RETRY)

          .getStatistics();

    }



    /**

     * {@inheritDoc}

     *

     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.

     *

     * @throws IOException if it exceeds max RPC retries.

     */

    @Override

    public Job getJob(JobReference jobRef) throws IOException, InterruptedException {

      return getJob(jobRef, Sleeper.DEFAULT, createDefaultBackoff());

    }



    @VisibleForTesting

    public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)

        throws IOException, InterruptedException {

      String jobId = jobRef.getJobId();

      Exception lastException;

      do {

        try {

          return client.jobs().get(jobRef.getProjectId(), jobId).execute();

        } catch (GoogleJsonResponseException e) {

          if (errorExtractor.itemNotFound(e)) {

            LOG.info("No BigQuery job with job id {} found.", jobId);

            return null;

          }

          LOG.info(

              "Ignoring the error encountered while trying to query the BigQuery job {}", jobId, e);

          lastException = e;

        } catch (IOException e) {

          LOG.info(

              "Ignoring the error encountered while trying to query the BigQuery job {}", jobId, e);

          lastException = e;

        }

      } while (nextBackOff(sleeper, backoff));

      throw new IOException(

          String.format(

              "Unable to find BigQuery job: %s, aborting after %d retries.",

              jobRef, MAX_RPC_RETRIES),

          lastException);

    }

  }



  @VisibleForTesting

  static class DatasetServiceImpl implements DatasetService {

    private static final FluentBackoff INSERT_BACKOFF_FACTORY =

        FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);



    // A backoff for rate limit exceeded errors. Retries forever.

    private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY =

        FluentBackoff.DEFAULT

            .withInitialBackoff(Duration.standardSeconds(1))

            .withMaxBackoff(Duration.standardMinutes(2));



    private final ApiErrorExtractor errorExtractor;

    private final Bigquery client;

    private final PipelineOptions options;

    private final long maxRowsPerBatch;

    private final long maxRowBatchSize;



    private ExecutorService executor;



    @VisibleForTesting

    DatasetServiceImpl(Bigquery client, PipelineOptions options) {

      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);

      this.errorExtractor = new ApiErrorExtractor();

      this.client = client;

      this.options = options;

      this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();

      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();

      this.executor = null;

    }



    @VisibleForTesting

    DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {

      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);

      this.errorExtractor = new ApiErrorExtractor();

      this.client = client;

      this.options = options;

      this.maxRowsPerBatch = maxRowsPerBatch;

      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();

      this.executor = null;

    }



    private DatasetServiceImpl(BigQueryOptions bqOptions) {

      this.errorExtractor = new ApiErrorExtractor();

      this.client = newBigQueryClient(bqOptions).build();

      this.options = bqOptions;

      this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();

      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();

      this.executor = null;

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    @Nullable

    public Table getTable(TableReference tableRef) throws IOException, InterruptedException {

      return getTable(tableRef, null);

    }



    @Override

    @Nullable

    public Table getTable(TableReference tableRef, List<String> selectedFields)

        throws IOException, InterruptedException {

      return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT);

    }



    @VisibleForTesting

    @Nullable

    Table getTable(

        TableReference ref, @Nullable List<String> selectedFields, BackOff backoff, Sleeper sleeper)

        throws IOException, InterruptedException {

      Tables.Get get =

          client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());

      if (selectedFields != null && !selectedFields.isEmpty()) {

        get.setSelectedFields(String.join(",", selectedFields));

      }

      try {

        return executeWithRetries(

            get,

            String.format(

                "Unable to get table: %s, aborting after %d retries.",

                ref.getTableId(), MAX_RPC_RETRIES),

            sleeper,

            backoff,

            DONT_RETRY_NOT_FOUND);

      } catch (IOException e) {

        if (errorExtractor.itemNotFound(e)) {

          return null;

        }

        throw e;

      }

    }



    /**

     * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the

     * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is

     * configured with a table spec function to use different tables for each window.

     */

    private static final int RETRY_CREATE_TABLE_DURATION_MILLIS =

        (int) TimeUnit.MINUTES.toMillis(5);



    /**

     * {@inheritDoc}

     *

     * <p>If a table with the same name already exists in the dataset, the function simply returns.

     * In such a case, the existing table doesn't necessarily have the same schema as specified by

     * the parameter.

     *

     * @throws IOException if other error than already existing table occurs.

     */

    @Override

    public void createTable(Table table) throws InterruptedException, IOException {

      LOG.info(

          "Trying to create BigQuery table: {}",

          BigQueryHelpers.toTableSpec(table.getTableReference()));

      BackOff backoff =

          new ExponentialBackOff.Builder()

              .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)

              .build();



      tryCreateTable(table, backoff, Sleeper.DEFAULT);

    }



    @VisibleForTesting

    @Nullable

    Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper) throws IOException {

      boolean retry = false;

      while (true) {

        try {

          return client

              .tables()

              .insert(

                  table.getTableReference().getProjectId(),

                  table.getTableReference().getDatasetId(),

                  table)

              .execute();

        } catch (IOException e) {

          ApiErrorExtractor extractor = new ApiErrorExtractor();

          if (extractor.itemAlreadyExists(e)) {

            // The table already exists, nothing to return.

            return null;

          } else if (extractor.rateLimited(e)) {

            // The request failed because we hit a temporary quota. Back off and try again.

            try {

              if (BackOffUtils.next(sleeper, backoff)) {

                if (!retry) {

                  LOG.info(

                      "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",

                      table.getTableReference().getProjectId(),

                      table.getTableReference().getDatasetId(),

                      table.getTableReference().getTableId(),

                      TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);

                  retry = true;

                }

                continue;

              }

            } catch (InterruptedException e1) {

              // Restore interrupted state and throw the last failure.

              Thread.currentThread().interrupt();

              throw e;

            }

          }

          throw e;

        }

      }

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {

      executeWithRetries(

          client

              .tables()

              .delete(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),

          String.format(

              "Unable to delete table: %s, aborting after %d retries.",

              tableRef.getTableId(), MAX_RPC_RETRIES),

          Sleeper.DEFAULT,

          createDefaultBackoff(),

          ALWAYS_RETRY);

    }



    @Override

    public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {

      return isTableEmpty(tableRef, createDefaultBackoff(), Sleeper.DEFAULT);

    }



    @VisibleForTesting

    boolean isTableEmpty(TableReference tableRef, BackOff backoff, Sleeper sleeper)

        throws IOException, InterruptedException {

      TableDataList dataList =

          executeWithRetries(

              client

                  .tabledata()

                  .list(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),

              String.format(

                  "Unable to list table data: %s, aborting after %d retries.",

                  tableRef.getTableId(), MAX_RPC_RETRIES),

              sleeper,

              backoff,

              DONT_RETRY_NOT_FOUND);

      return dataList.getRows() == null || dataList.getRows().isEmpty();

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public Dataset getDataset(String projectId, String datasetId)

        throws IOException, InterruptedException {

      return executeWithRetries(

          client.datasets().get(projectId, datasetId),

          String.format(

              "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES),

          Sleeper.DEFAULT,

          createDefaultBackoff(),

          DONT_RETRY_NOT_FOUND);

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void createDataset(

        String projectId,

        String datasetId,

        @Nullable String location,

        @Nullable String description,

        @Nullable Long defaultTableExpirationMs)

        throws IOException, InterruptedException {

      createDataset(

          projectId,

          datasetId,

          location,

          description,

          defaultTableExpirationMs,

          Sleeper.DEFAULT,

          createDefaultBackoff());

    }



    private void createDataset(

        String projectId,

        String datasetId,

        @Nullable String location,

        @Nullable String description,

        @Nullable Long defaultTableExpirationMs,

        Sleeper sleeper,

        BackOff backoff)

        throws IOException, InterruptedException {

      DatasetReference datasetRef =

          new DatasetReference().setProjectId(projectId).setDatasetId(datasetId);



      Dataset dataset = new Dataset().setDatasetReference(datasetRef);

      if (location != null) {

        dataset.setLocation(location);

      }

      if (description != null) {

        dataset.setFriendlyName(description);

        dataset.setDescription(description);

      }

      if (defaultTableExpirationMs != null) {

        dataset.setDefaultTableExpirationMs(defaultTableExpirationMs);

      }



      Exception lastException;

      do {

        try {

          client.datasets().insert(projectId, dataset).execute();

          return; // SUCCEEDED

        } catch (GoogleJsonResponseException e) {

          if (errorExtractor.itemAlreadyExists(e)) {

            return; // SUCCEEDED

          }

          // ignore and retry

          LOG.info("Ignore the error and retry creating the dataset.", e);

          lastException = e;

        } catch (IOException e) {

          LOG.info("Ignore the error and retry creating the dataset.", e);

          lastException = e;

        }

      } while (nextBackOff(sleeper, backoff));

      throw new IOException(

          String.format(

              "Unable to create dataset: %s, aborting after %d .", datasetId, MAX_RPC_RETRIES),

          lastException);

    }



    /**

     * {@inheritDoc}

     *

     * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds.

     *

     * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.

     */

    @Override

    public void deleteDataset(String projectId, String datasetId)

        throws IOException, InterruptedException {

      executeWithRetries(

          client.datasets().delete(projectId, datasetId),

          String.format(

              "Unable to delete table: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES),

          Sleeper.DEFAULT,

          createDefaultBackoff(),

          ALWAYS_RETRY);

    }



    @VisibleForTesting

    <T> long insertAll(

        TableReference ref,

        List<ValueInSingleWindow<TableRow>> rowList,

        @Nullable List<String> insertIdList,

        BackOff backoff,

        final Sleeper sleeper,

        InsertRetryPolicy retryPolicy,

        List<ValueInSingleWindow<T>> failedInserts,

        ErrorContainer<T> errorContainer,

        boolean skipInvalidRows,

        boolean ignoreUnkownValues)

        throws IOException, InterruptedException {

      checkNotNull(ref, "ref");

      if (executor == null) {

        this.executor =

            new BoundedExecutorService(

                options.as(GcsOptions.class).getExecutorService(),

                options.as(BigQueryOptions.class).getInsertBundleParallelism());

      }

      if (insertIdList != null && rowList.size() != insertIdList.size()) {

        throw new AssertionError(

            "If insertIdList is not null it needs to have at least "

                + "as many elements as rowList");

      }



      long retTotalDataSize = 0;

      List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();

      // These lists contain the rows to publish. Initially the contain the entire list.

      // If there are failures, they will contain only the failed rows to be retried.

      List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;

      List<String> idsToPublish = insertIdList;

      while (true) {

        List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();

        List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null;



        int strideIndex = 0;

        // Upload in batches.

        List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>();

        int dataSize = 0;



        List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();

        List<Integer> strideIndices = new ArrayList<>();



        for (int i = 0; i < rowsToPublish.size(); ++i) {

          TableRow row = rowsToPublish.get(i).getValue();

          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();

          if (idsToPublish != null) {

            out.setInsertId(idsToPublish.get(i));

          }

          out.setJson(row.getUnknownKeys());

          rows.add(out);



          dataSize += row.toString().length();

          if (dataSize >= maxRowBatchSize

              || rows.size() >= maxRowsPerBatch

              || i == rowsToPublish.size() - 1) {

            TableDataInsertAllRequest content = new TableDataInsertAllRequest();

            content.setRows(rows);

            content.setSkipInvalidRows(skipInvalidRows);

            content.setIgnoreUnknownValues(ignoreUnkownValues);



            final Bigquery.Tabledata.InsertAll insert =

                client

                    .tabledata()

                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content);



            futures.add(

                executor.submit(

                    () -> {

                      // A backoff for rate limit exceeded errors. Retries forever.

                      BackOff backoff1 =

                          BackOffAdapter.toGcpBackOff(RATE_LIMIT_BACKOFF_FACTORY.backoff());

                      while (true) {

                        try {

                          return insert.execute().getInsertErrors();

                        } catch (IOException e) {

                          LOG.info(

                              String.format(

                                  "BigQuery insertAll error, retrying: %s",

                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));

                          try {

                            sleeper.sleep(backoff1.nextBackOffMillis());

                          } catch (InterruptedException interrupted) {

                            throw new IOException(

                                "Interrupted while waiting before retrying insertAll");

                          }

                        }

                      }

                    }));

            strideIndices.add(strideIndex);



            retTotalDataSize += dataSize;



            dataSize = 0;

            strideIndex = i + 1;

            rows = new ArrayList<>();

          }

        }



        try {

          for (int i = 0; i < futures.size(); i++) {

            List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();

            if (errors == null) {

              continue;

            }

            for (TableDataInsertAllResponse.InsertErrors error : errors) {

              if (error.getIndex() == null) {

                throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);

              }



              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);

              if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {

                allErrors.add(error);

                retryRows.add(rowsToPublish.get(errorIndex));

                if (retryIds != null) {

                  retryIds.add(idsToPublish.get(errorIndex));

                }

              } else {

                errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));

              }

            }

          }

        } catch (InterruptedException e) {

          Thread.currentThread().interrupt();

          throw new IOException("Interrupted while inserting " + rowsToPublish);

        } catch (ExecutionException e) {

          throw new RuntimeException(e.getCause());

        }



        if (allErrors.isEmpty()) {

          break;

        }

        long nextBackoffMillis = backoff.nextBackOffMillis();

        if (nextBackoffMillis == BackOff.STOP) {

          break;

        }

        try {

          sleeper.sleep(nextBackoffMillis);

        } catch (InterruptedException e) {

          Thread.currentThread().interrupt();

          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);

        }

        rowsToPublish = retryRows;

        idsToPublish = retryIds;

        allErrors.clear();

        LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());

      }

      if (!allErrors.isEmpty()) {

        throw new IOException("Insert failed: " + allErrors);

      } else {

        return retTotalDataSize;

      }

    }



    @Override

    public <T> long insertAll(

        TableReference ref,

        List<ValueInSingleWindow<TableRow>> rowList,

        @Nullable List<String> insertIdList,

        InsertRetryPolicy retryPolicy,

        List<ValueInSingleWindow<T>> failedInserts,

        ErrorContainer<T> errorContainer,

        boolean skipInvalidRows,

        boolean ignoreUnknownValues)

        throws IOException, InterruptedException {

      return insertAll(

          ref,

          rowList,

          insertIdList,

          BackOffAdapter.toGcpBackOff(INSERT_BACKOFF_FACTORY.backoff()),

          Sleeper.DEFAULT,

          retryPolicy,

          failedInserts,

          errorContainer,

          skipInvalidRows,

          ignoreUnknownValues);

    }



    @Override

    public Table patchTableDescription(

        TableReference tableReference, @Nullable String tableDescription)

        throws IOException, InterruptedException {

      Table table = new Table();

      table.setDescription(tableDescription);



      return executeWithRetries(

          client

              .tables()

              .patch(

                  tableReference.getProjectId(),

                  tableReference.getDatasetId(),

                  tableReference.getTableId(),

                  table),

          String.format(

              "Unable to patch table description: %s, aborting after %d retries.",

              tableReference, MAX_RPC_RETRIES),

          Sleeper.DEFAULT,

          createDefaultBackoff(),

          ALWAYS_RETRY);

    }

  }



  static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND =

      input -> {

        ApiErrorExtractor errorExtractor = new ApiErrorExtractor();

        return !errorExtractor.itemNotFound(input);

      };



  static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true;



  @VisibleForTesting

  static <T> T executeWithRetries(

      AbstractGoogleClientRequest<T> request,

      String errorMessage,

      Sleeper sleeper,

      BackOff backoff,

      SerializableFunction<IOException, Boolean> shouldRetry)

      throws IOException, InterruptedException {

    Exception lastException = null;

    do {

      try {

        return request.execute();

      } catch (IOException e) {

        lastException = e;

        if (!shouldRetry.apply(e)) {

          break;

        }

        LOG.info("Ignore the error and retry the request.", e);

      }

    } while (nextBackOff(sleeper, backoff));

    throw new IOException(errorMessage, lastException);

  }



  /** Identical to {@link BackOffUtils#next} but without checked IOException. */

  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {

    try {

      return BackOffUtils.next(sleeper, backoff);

    } catch (IOException e) {

      throw new RuntimeException(e);

    }

  }



  /** Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */

  private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {

    RetryHttpRequestInitializer httpRequestInitializer =

        new RetryHttpRequestInitializer(ImmutableList.of(404));

    httpRequestInitializer.setCustomErrors(createBigQueryClientCustomErrors());

    httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout());

    return new Bigquery.Builder(

            Transport.getTransport(),

            Transport.getJsonFactory(),

            chainHttpRequestInitializer(

                options.getGcpCredential(),

                // Do not log 404. It clutters the output and is possibly even required by the

                // caller.

                httpRequestInitializer))

        .setApplicationName(options.getAppName())

        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());

  }



  private static HttpRequestInitializer chainHttpRequestInitializer(

      Credentials credential, HttpRequestInitializer httpRequestInitializer) {

    if (credential == null) {

      return new ChainingHttpRequestInitializer(

          new NullCredentialInitializer(), httpRequestInitializer);

    } else {

      return new ChainingHttpRequestInitializer(

          new HttpCredentialsAdapter(credential), httpRequestInitializer);

    }

  }



  public static CustomHttpErrors createBigQueryClientCustomErrors() {

    CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();

    // 403 errors, to list tables, matching this URL:

    // http://www.googleapis.com/bigquery/v2/projects/myproject/datasets/

    //     mydataset/tables?maxResults=1000

    builder.addErrorForCodeAndUrlContains(

        403,

        "/tables?",

        "The GCP project is most likely exceeding the rate limit on "

            + "bigquery.tables.list, please find the instructions to increase this limit at: "

            + "https://cloud.google.com/service-infrastructure/docs/rate-limiting#configure");

    return builder.build();

  }



  @Experimental(Experimental.Kind.SOURCE_SINK)

  static class StorageClientImpl implements StorageClient {



    private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =

        FixedHeaderProvider.create(

            "user-agent", "Apache_Unified_Java/" + ReleaseInfo.getReleaseInfo().getVersion());



    private final BigQueryStorageClient client;



    private StorageClientImpl(BigQueryOptions options) throws IOException {

      BigQueryStorageSettings settings =

          BigQueryStorageSettings.newBuilder()

              .setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential()))

              .setTransportChannelProvider(

                  BigQueryStorageSettings.defaultGrpcTransportProviderBuilder()

                      .setHeaderProvider(USER_AGENT_HEADER_PROVIDER)

                      .build())

              .build();

      this.client = BigQueryStorageClient.create(settings);

    }



    @Override

    public ReadSession createReadSession(CreateReadSessionRequest request) {

      return client.createReadSession(request);

    }



    @Override

    public Iterable<ReadRowsResponse> readRows(ReadRowsRequest request) {

      return client.readRowsCallable().call(request);

    }



    @Override

    public void close() {

      client.close();

    }

  }



  private static class BoundedExecutorService implements ExecutorService {

    private final ExecutorService executor;

    private final Semaphore semaphore;

    private final int parallelism;



    BoundedExecutorService(ExecutorService executor, int parallelism) {

      this.executor = executor;

      this.parallelism = parallelism;

      this.semaphore = new Semaphore(parallelism);

    }



    @Override

    public void shutdown() {

      executor.shutdown();

    }



    @Override

    public List<Runnable> shutdownNow() {

      List<Runnable> runnables = executor.shutdownNow();

      // try to release permits as many as possible before returning semaphored runnables.

      synchronized (this) {

        if (semaphore.availablePermits() <= parallelism) {

          semaphore.release(Integer.MAX_VALUE - parallelism);

        }

      }

      return runnables;

    }



    @Override

    public boolean isShutdown() {

      return executor.isShutdown();

    }



    @Override

    public boolean isTerminated() {

      return executor.isTerminated();

    }



    @Override

    public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {

      return executor.awaitTermination(l, timeUnit);

    }



    @Override

    public <T> Future<T> submit(Callable<T> callable) {

      return executor.submit(new SemaphoreCallable<>(callable));

    }



    @Override

    public <T> Future<T> submit(Runnable runnable, T t) {

      return executor.submit(new SemaphoreRunnable(runnable), t);

    }



    @Override

    public Future<?> submit(Runnable runnable) {

      return executor.submit(new SemaphoreRunnable(runnable));

    }



    @Override

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)

        throws InterruptedException {

      return executor.invokeAll(

          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));

    }



    @Override

    public <T> List<Future<T>> invokeAll(

        Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)

        throws InterruptedException {

      return executor.invokeAll(

          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),

          l,

          timeUnit);

    }



    @Override

    public <T> T invokeAny(Collection<? extends Callable<T>> collection)

        throws InterruptedException, ExecutionException {

      return executor.invokeAny(

          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));

    }



    @Override

    public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)

        throws InterruptedException, ExecutionException, TimeoutException {

      return executor.invokeAny(

          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),

          l,

          timeUnit);

    }



    @Override

    public void execute(Runnable runnable) {

      executor.execute(new SemaphoreRunnable(runnable));

    }



    private class SemaphoreRunnable implements Runnable {

      private final Runnable runnable;



      SemaphoreRunnable(Runnable runnable) {

        this.runnable = runnable;

      }



      @Override

      public void run() {

        try {

          semaphore.acquire();

        } catch (InterruptedException e) {

          throw new RuntimeException("semaphore acquisition interrupted. task canceled.");

        }

        try {

          runnable.run();

        } finally {

          semaphore.release();

        }

      }

    }



    private class SemaphoreCallable<V> implements Callable<V> {

      private final Callable<V> callable;



      SemaphoreCallable(Callable<V> callable) {

        this.callable = callable;

      }



      @Override

      public V call() throws Exception {

        semaphore.acquire();

        try {

          return callable.call();

        } finally {

          semaphore.release();

        }

      }

    }

  }

}